Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL subscription support #604

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft

GraphQL subscription support #604

wants to merge 3 commits into from

Conversation

cafca
Copy link
Member

@cafca cafca commented Dec 8, 2023

Prototype for GraphQL subscriptions that lets you stream the value of a single document, yields an update once a second.

  • Adds a WebSocket subscription endpoint
  • Adds a GraphQL query and resolver for streaming changes to a document
  • Shows an error message when the initial schema fails to build

📋 Checklist

  • Add tests that cover your changes
  • Add this PR to the Unreleased section in CHANGELOG.md
  • Link this PR to any issues it closes
  • New files contain a SPDX license header

Comment on lines +311 to +314
self.clone()
.stream(request, session_data.unwrap_or_default())
.flatten_stream()
.boxed()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These four lines!! This drove me crazy lol

Copy link
Member

@adzialocha adzialocha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wohooo! Love it! 🥰

Comment on lines +230 to +245
loop {
let store = ctx.data_unchecked::<SqlStore>();

match get_document_from_params(store, &document_id, &document_view_id).await? {
Some(document) => {
yield Ok(FieldValue::owned_any(Resolved::Document(document)));
}
None => {
yield Err(Error::new("Document not found"));
return
}
};


tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about push-based replication in another issue which is slightly related to this, maybe it's helpful for you:

We could introduce a "subscription service" or struct on the global Context which simply just keeps a map of who's interested in what ("node c is interested in schema p", "client k is interested in document y", etc.). The node interests we know through Announcement messages (similar to search queries on a network), so it'll be quite easy to implement that, client interests we will learn through new subscriptions.

The materializer service informs the subscription service about any finished materialization via the bus, by document id and by schema id.

The subscription service fires the regarding callbacks of the subscribers which then again trigger the logic needed (make db request, push result to client / initiate replication with node, later we'll have "live mode" which just pushes the new data without any fancy protocol).

Copy link
Member

@adzialocha adzialocha Dec 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to look at this service as part of #589 so we could tie this together at one point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, interesting. I build a more simple integration here, but yeah, that could be upgraded to a proper subscription service. I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex. I also looked into using database queries as triggers for subscriptions, so that you register a query and get updated whenever its results change, but I didn't find something in sqlx or postgres yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, interesting. I build a more simple integration here

Looking already very much like that! Awesome!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you are just interested in updates to a single document it's quite simple to just use what we already have, but as soon as you get into filtering and sorting it gets more complex

That will be pretty complex indeed 🤔 I think it's enough to scope things by schema id (collection query) or document id (document query) for now, even if it's ignoring the filter

Copy link
Member Author

@cafca cafca Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean

Copy link
Member

@adzialocha adzialocha Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can tell me a bit more about what you imagine the subscription service to offer then I can have a go at a first version. edit: actually, maybe I have an idea of what you mean

Out of the top of my head (no laptop with me right now) some rough ideas, probably need improvement:

  • Keep a new struct Connections / Peers etc. in the global Context, similar to schema manager
  • The purpose of it is to manage a list of all currently connected / subscribed clients and (later) nodes, probably in form of a map with some sort of unique subscription id -> subscription enum + callback list mapping while the enum is either a SchemaIdSet (collection queries and node announcements) or DocumentId (document queries)
  • Since this thing lives in global context now we can reach it from anywhere: the replication service can populate it with currently known nodes (it learns about it through peer dis-/connected messages) and the graphql service (whenever a subscription starts or ends)
  • Later (not for now) we can use this state to learn about how many nodes I'm currently connected to etc. (Could be an protected graphql or public crate method)
  • Callbacks are added and removed to the map scoped by this unique id and sets of interest
  • A small subscription service is introduced (like all other services). It hooks into the message bus and waits for materializer events. Whenever a document got touched we want to know its schema id and document id (probably that's part of the message itself), so this needs to be sent from the materializer and received here
  • The service as all others also has access to the context and therefore connection table. It looks up on each incoming message if someone is subscribed to that document or that schema and accordingly calls the callbacks
  • For nodes we probably then just want to send a message to the replication service, telling it to begin replication
  • For clients the callback probably just kicks in the subscription callback which repeats the query over the collection or document. As mentioned already I think it's fine if filters are currently ignored at this stage
  • That's probably also not part of your PRs but note for later: our replication service should know if it is in live mode with this peer already, if yes it will just send the new data to it (later just via a gossip overlay broadcast which is also just sort of subscription over a topic / schema id set), if not it will just initiate a normal 1:1 replication session as we currently already do but it would become push-based which is nice
  • Surely that subscription logic could live in another service, not it's own, not sure if it needs it's own place already. On the other hand, it doesn't do anything clearly related to one service and on top it might handle other things in the future like automatically removing timed-out callbacks etc.

Copy link
Member

@sandreae sandreae left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delayed response, but YEY!! this is great :-) exciting to see subscriptions land.

I understand the next step is to have the subscription service respond to document updates, but the functionality this PR brings is already very powerful. I'm happy to merge this as is 👍

I guess tests are a pain..? all the document resolution logic which is reused here is already tested, but of course, not the actual subscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants